package com.chenssy.client.core;

import com.chenssy.common.core.InstanceInfo;
import com.chenssy.common.core.Lease;
import com.chenssy.common.core.RecentlyChangedServiceInstance;
import com.chenssy.common.dto.*;
import com.chenssy.common.enums.InstanceChangedOperationEnum;
import com.chenssy.common.enums.ResponseStatusEnum;
import lombok.extern.slf4j.Slf4j;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicStampedReference;

/**
 * 注册中心客户端
 * @author:chenssy
 * @date:2021/7/6
 */
@Slf4j
public class RegisterClient {

    private final String SERVICE_NAME = "study-client";

    private final String IP = "127.0.123.101";

    private final String HOST_NAME = "study-client";

    private final Integer PORT = 8890;

    /** 服务实例ID */
    private String serviceInstanceId;

    /** 是否运行 */
    private boolean isRunning = false;

    // 客户端注册表
    private AtomicStampedReference<ClientRegistry> registryRefrence = new AtomicStampedReference(new ClientRegistry(),0);

    // 客户端注册表的版本号
    private AtomicLong registryVersion = new AtomicLong(0L);

    public RegisterClient(String serviceInstanceId) {
        this.serviceInstanceId = serviceInstanceId;
    }

    public void register() throws Exception {
        RegisterWork registerWork = new RegisterWork();
        registerWork.start();

        // 等待 registerWork 运行结束
        registerWork.join();

        // 心跳
        HearbeatWork hearbeatWork = new HearbeatWork();
        hearbeatWork.start();

        // 全量拉取注册表线程
        FetchFullRegisterWorker pullRegister = new FetchFullRegisterWorker();
        pullRegister.start();

        // 增量拉取注册表线程
        FetchDeltaRegisterWorker fetchDeltaRegisterWorker  = new FetchDeltaRegisterWorker();
        fetchDeltaRegisterWorker.start();
    }

    /**
     * 注册线程
     */
    private class RegisterWork extends Thread {
        @Override
        public void run() {
            RegisterRequest request = RegisterRequest.builder()
                    .serviceInstanceId(serviceInstanceId)
                    .serviceName(SERVICE_NAME)
                    .hostName(HOST_NAME)
                    .ip(IP)
                    .port(PORT)
                    .build();

            RegisterResponse resposne = HttpSender.register(request);

            if (ResponseStatusEnum.SUCCESS.getCode().equals(resposne.getCode())) {
                log.info("[RegisterWork] - {}-{},注册成功...",SERVICE_NAME,serviceInstanceId);
                isRunning = true;
            }
        }
    }



    /**
     * 发送心跳线程
     */
    private class HearbeatWork extends Thread {
        @Override
        public void run() {

            HearbeatRequest hearbeatRequest = new HearbeatRequest();
            hearbeatRequest.setServiceName(SERVICE_NAME);
            hearbeatRequest.setServiceInstanceId(serviceInstanceId);

            while (isRunning) {
                HearbeatResponse response = HttpSender.heartbear(hearbeatRequest);

                log.info("[HearbeatWork] - 发送心跳响应结果为:{}",response.getStatus());

                try {
                    Thread.sleep(5 * 1000);
                } catch (InterruptedException e) {

                }
            }
        }
    }

    /**
     * 定时拉取注册表
     */
    private class FetchFullRegisterWorker extends Thread {
        @Override
        public void run() {
            // 在发起网络请求前一定要先获取注册表版本号
            Long expectedVersion = registryVersion.get();
            PullRegistryResponse registryResponse = HttpSender.pullRegister();
            // 只有发起网络请求后，注册表的版本号没有被人修改，此时才能去修改，如果版本号不一致，则说明该数据已经被人修改了，则不能将旧数据进行替换
            if (registryVersion.compareAndSet(expectedVersion,expectedVersion + 1)) {
                while (true) {
                    ClientRegistry expectRegistry = registryRefrence.getReference();
                    int expectedStamp = registryRefrence.getStamp();
                    ClientRegistry fetchedRegistry = new ClientRegistry(registryResponse.getRegistry());
                    if (registryRefrence.compareAndSet(expectRegistry,fetchedRegistry,expectedStamp,expectedStamp+1)) {
                        break;
                    }
                }
            }
        }
    }

    /**
     * 定时拉取注册表
     */
    private class FetchDeltaRegisterWorker extends Thread {
        @Override
        public void run() {

            while (isRunning) {
                try {
                    TimeUnit.SECONDS.sleep(3 * 60 * 1000);

                    DeltaRegistry deltaRegistry = HttpSender.fetchDeltaPullRegister();

                    mergeDeltaRegistry(deltaRegistry.getDeltaRegistryList());

                    checkRegistryCount(deltaRegistry.getRegistryTotal());


                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }

        /**
         * 比较两数量
         * @param registryTotal
         */
        private void checkRegistryCount(Long registryTotal) {
            Map<String, Map<String, Lease<InstanceInfo>>> registry = registryRefrence.getReference().getRegistry();

            Long clientRegistryTotal = 0L;
            for(Map<String,Lease<InstanceInfo>> leaseMap :registry.values()) {
                clientRegistryTotal += leaseMap.size();
            }

            if (!registryTotal.equals(clientRegistryTotal)) {
                Long expectedVersion = registryVersion.get();
                PullRegistryResponse registryResponse = HttpSender.pullRegister();
                if (registryVersion.compareAndSet(expectedVersion, expectedVersion + 1)) {
                    while (true) {
                        ClientRegistry expectRegistry = registryRefrence.getReference();
                        int expectStamp = registryRefrence.getStamp();

                        ClientRegistry fetchedRegistry = new ClientRegistry(registryResponse.getRegistry());
                        if (registryRefrence.compareAndSet(expectRegistry,fetchedRegistry,expectStamp,expectStamp+1)) {
                            break;
                        }
                    }
                }
            }
        }

        /**
         * 合并到本地缓存注册表中
         * @param serviceInstanceList
         */
        private void mergeDeltaRegistry(List<RecentlyChangedServiceInstance> serviceInstanceList) {
            Map<String, Map<String, Lease<InstanceInfo>>> registry = registryRefrence.getReference().getRegistry();

            synchronized (registry) {
                for (RecentlyChangedServiceInstance instance : serviceInstanceList) {
                    if (InstanceChangedOperationEnum.register.getOperation().equals(instance.getChangedOperation())) {
                        Map<String, Lease<InstanceInfo>> leaseMap = registry.get(instance.getInstanceInfo().getServiceName());
                        if (leaseMap == null) {
                            leaseMap = new HashMap<>();
                            registry.put(instance.getInstanceInfo().getServiceName(),leaseMap);
                        }

                        Lease lease = new Lease(instance.getInstanceInfo());

                        leaseMap.put(instance.getInstanceInfo().getServiceInstanceId(),lease);
                    } else {
                        Map<String,Lease<InstanceInfo>> leaseMap = registry.get(instance.getInstanceInfo().getServiceName());

                        if (leaseMap == null) {
                            return ;
                        }

                        Lease lease = leaseMap.get(instance.getInstanceInfo().getServiceInstanceId());
                        if (lease == null) {
                            return;
                        }

                        leaseMap.remove(instance.getInstanceInfo().getServiceInstanceId());
                    }
                }
            }
        }
    }
}
